這章的重點是大數據相關的處理。儘管如此,本章使用的範例資料是來自 U.S. Internal Revenue Service 的收入與稅收的關聯性資料,只有約 100,000 行(約 100 MB)。相關的程式碼請從 Chapter 05 下載。
對於一般的資料,可以整個讀進去記憶體再做檢視:
(defn ex-5-1 []
(-> (slurp "data/soi.csv")
(str/split #"\n")
(first)))
但是,在數據比記憶體大的情況下,全部載入就變成無法執行的策略。若我們使用 Clojure 的特殊能力 lazy sequences(跟 Python 的 yield
類似的概念),我們就能夠只讀取第一行:
(defn ex-5-2 []
(-> (io/reader "data/soi.csv")
(line-seq)
(first)))
這裡使用 clojure.java.io
庫返回檔案的引用參考(reference
),接著使用 clojure.core
的功能 lazy-seq
返回一個惰性求值的字串列表,然後只讀取第一個。
結果可以看到,檔案有 77 個 column。對於接下來的分析有用處的資料欄位大致是:
STATEFIPS
:Federal Information Processing System codeSTATE
:美國各州簡稱zipcode
:五碼郵遞區號AGI_STUB
:調整後收入分級以及一些跟納稅人個資有關的,例如撫養資料等。
具體有多少行?
(defn ex-5-3 []
(-> (io/reader "data/soi.csv")
(line-seq)
(count)))
執行後的結果是 166,905。不過,count
後面是 Java,純 Clojure 的看起來是:
(defn ex-5-4 []
(->> (io/reader "data/soi.csv")
(line-seq)
(reduce (fn [i x] (inc 1)) 0)))
reducers
函式庫reduce
是比較低效率的實現。對於支持併發的 Clojure,有個庫叫做 reducers
,包括在 clojure.core.reducers
。他的原理是通過將 sequences 分成多個 reduce
去執行指定的計算再合併。
;; import clojure.core.reducers as r
(defn ex-5-5 []
(->> (io/reader "data/soi.csv")
(line-seq)
(r/fold + (fn [1 x] (inc 1)))))
可以直接傳遞 +
而不包含初始值,是因為不傳遞參數給 +
會返回 0 而若只有一個參數,則返回該參數本身。預設的 reducers
會把 sequence 拆成 512 個 subsequence,然後再合併。
iota
載入大型資料iota 是一個 reducers
的配套資料庫,主體部分是 Java 的 NIO mmap()
,用來載入遠比記憶體大的資料作運算。在專案中加入並引入,我們就可以改寫我們的程式:
(defn ex-5-7 []
(->> (iota/seq "data/soi.csv")
(r/fold + (fn [1 x] (inc 1)))))
不過,這部分也僅只是計算行數。我們需要將資料清理函數接上 reducers
reducers
資料處理流程(pipeline)首先將應該是數字的字串轉成浮點數。接著使用 reducers
版本的 map
將此轉換函數套用到每一行上。最後的 into
則把 reducers
內部結構轉換為 vector。另外,在函數 ex-5-8
中,第一行被丟棄,但實際上它正是 column,我們可以把它保留下以便重複利用,而不要用 r/drop
丟棄之。
(defn parse-double [x]
(Double/parseDouble x))
(defn parse-line [line]
(let [[text-fields double-fields] (->> (str/split line #",") (split-at 2))]
(concat text-fields
(map parse-double double-fields))))
(defn parse-columns [line]
(->> (str/split line #",")
(map keyword)))
(defn ex-5-9 []
(let [data (iota/seq "data/soi.csv")
column-names (parse-columns (first data))]
(->> (r/drop 1 data)
(r/map parse-line)
(r/map (fn [fields] (zipmap column-names fields)))
(r/take 1)
(into []))))
;; [{:N2 1505430.0, :A19300 181519.0, :MARS4 256900.0 ...}]